home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2009 February / PCWFEB09.iso / Software / Resources / Chat & Communication / Digsby build 37 / digsby_setup.exe / lib / pyxmpp / streambase.pyo (.txt) < prev    next >
Python Compiled Bytecode  |  2008-10-13  |  20KB  |  751 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.5)
  3.  
  4. __revision__ = '$Id: streambase.py 652 2006-08-27 19:41:15Z jajcus $'
  5. __docformat__ = 'restructuredtext en'
  6. import libxml2
  7. import socket
  8. import os
  9. import time
  10. import random
  11. import threading
  12. import errno
  13. import logging
  14. from pyxmpp import xmlextra
  15. from pyxmpp.expdict import ExpiringDictionary
  16. from pyxmpp.utils import to_utf8
  17. from pyxmpp.stanza import Stanza
  18. from pyxmpp.error import StreamErrorNode
  19. from pyxmpp.iq import Iq
  20. from pyxmpp.presence import Presence
  21. from pyxmpp.message import Message
  22. from pyxmpp.jid import JID
  23. from pyxmpp import resolver
  24. from pyxmpp.stanzaprocessor import StanzaProcessor
  25. from pyxmpp.exceptions import StreamError, StreamEncryptionRequired, HostMismatch, ProtocolError
  26. from pyxmpp.exceptions import FatalStreamError, StreamParseError, StreamAuthenticationError
  27. STREAM_NS = 'http://etherx.jabber.org/streams'
  28. BIND_NS = 'urn:ietf:params:xml:ns:xmpp-bind'
  29.  
  30. def stanza_factory(xmlnode, stream = None):
  31.     if xmlnode.name == 'iq':
  32.         return Iq(xmlnode, stream = stream)
  33.     
  34.     if xmlnode.name == 'message':
  35.         return Message(xmlnode, stream = stream)
  36.     
  37.     if xmlnode.name == 'presence':
  38.         return Presence(xmlnode, stream = stream)
  39.     else:
  40.         return Stanza(xmlnode, stream = stream)
  41.  
  42.  
  43. class StreamBase(StanzaProcessor, xmlextra.StreamHandler):
  44.     
  45.     def __init__(self, default_ns, extra_ns = (), keepalive = 0, owner = None):
  46.         StanzaProcessor.__init__(self)
  47.         xmlextra.StreamHandler.__init__(self)
  48.         self.default_ns_uri = default_ns
  49.         if extra_ns:
  50.             self.extra_ns_uris = extra_ns
  51.         else:
  52.             self.extra_ns_uris = []
  53.         self.keepalive = keepalive
  54.         self._reader_lock = threading.Lock()
  55.         self.process_all_stanzas = False
  56.         self.port = None
  57.         self._reset()
  58.         self.owner = owner
  59.         self._StreamBase__logger = logging.getLogger('pyxmpp.Stream')
  60.  
  61.     
  62.     def _reset(self):
  63.         self.doc_in = None
  64.         self.doc_out = None
  65.         self.socket = None
  66.         self._reader = None
  67.         self.addr = None
  68.         self.default_ns = None
  69.         self.extra_ns = { }
  70.         self.stream_ns = None
  71.         self._reader = None
  72.         self.ioreader = None
  73.         self.me = None
  74.         self.peer = None
  75.         self.skip = False
  76.         self.stream_id = None
  77.         self._iq_response_handlers = ExpiringDictionary()
  78.         self._iq_get_handlers = { }
  79.         self._iq_set_handlers = { }
  80.         self._message_handlers = []
  81.         self._presence_handlers = []
  82.         self.eof = False
  83.         self.initiator = None
  84.         self.features = None
  85.         self.authenticated = False
  86.         self.peer_authenticated = False
  87.         self.auth_method_used = None
  88.         self.version = None
  89.         self.last_keepalive = False
  90.  
  91.     
  92.     def _connect_socket(self, sock, to = None):
  93.         self.eof = 0
  94.         self.socket = sock
  95.         if to:
  96.             self.peer = JID(to)
  97.         else:
  98.             self.peer = None
  99.         self.initiator = 1
  100.         self._send_stream_start()
  101.         self._make_reader()
  102.  
  103.     
  104.     def connect(self, addr, port, service = None, to = None):
  105.         self.lock.acquire()
  106.         
  107.         try:
  108.             return self._connect(addr, port, service, to)
  109.         finally:
  110.             self.lock.release()
  111.  
  112.  
  113.     
  114.     def _connect(self, addr, port, service = None, to = None):
  115.         if to is None:
  116.             to = str(addr)
  117.         
  118.         if service is not None:
  119.             self.state_change('resolving srv', (addr, service))
  120.             addrs = resolver.resolve_srv(addr, service)
  121.             if not addrs:
  122.                 addrs = [
  123.                     (addr, port)]
  124.             
  125.         else:
  126.             addrs = [
  127.                 (addr, port)]
  128.         msg = None
  129.         for addr, port in addrs:
  130.             if type(addr) in (str, unicode):
  131.                 self.state_change('resolving', addr)
  132.             
  133.             s = None
  134.             for res in resolver.getaddrinfo(addr, port, 0, socket.SOCK_STREAM):
  135.                 (family, socktype, proto, _unused, sockaddr) = res
  136.                 
  137.                 try:
  138.                     s = socket.socket(family, socktype, proto)
  139.                     self.state_change('connecting', sockaddr)
  140.                     s.connect(sockaddr)
  141.                     self.state_change('connected', sockaddr)
  142.                 except socket.error:
  143.                     msg = None
  144.                     self._StreamBase__logger.debug('Connect to %r failed' % (sockaddr,))
  145.                     if s:
  146.                         s.close()
  147.                         s = None
  148.                         continue
  149.                     continue
  150.  
  151.                 break
  152.             
  153.             if s:
  154.                 break
  155.                 continue
  156.         
  157.         if not s:
  158.             if msg:
  159.                 raise socket.error, msg
  160.             else:
  161.                 raise FatalStreamError, 'Cannot connect'
  162.         
  163.         self.addr = addr
  164.         self.port = port
  165.         self._connect_socket(s, to)
  166.         self.last_keepalive = time.time()
  167.  
  168.     
  169.     def accept(self, sock, myname):
  170.         self.lock.acquire()
  171.         
  172.         try:
  173.             return self._accept(sock, myname)
  174.         finally:
  175.             self.lock.release()
  176.  
  177.  
  178.     
  179.     def _accept(self, sock, myname):
  180.         self.eof = 0
  181.         (self.socket, addr) = sock.accept()
  182.         self._StreamBase__logger.debug('Connection from: %r' % (addr,))
  183.         (self.addr, self.port) = addr
  184.         if myname:
  185.             self.me = JID(myname)
  186.         else:
  187.             self.me = None
  188.         self.initiator = 0
  189.         self._make_reader()
  190.         self.last_keepalive = time.time()
  191.  
  192.     
  193.     def disconnect(self):
  194.         self.lock.acquire()
  195.         
  196.         try:
  197.             return self._disconnect()
  198.         finally:
  199.             self.lock.release()
  200.  
  201.  
  202.     
  203.     def _disconnect(self):
  204.         if self.doc_out:
  205.             self._send_stream_end()
  206.         
  207.  
  208.     
  209.     def _post_connect(self):
  210.         pass
  211.  
  212.     
  213.     def _post_auth(self):
  214.         pass
  215.  
  216.     
  217.     def state_change(self, state, arg):
  218.         self._StreamBase__logger.debug('State: %s: %r' % (state, arg))
  219.  
  220.     
  221.     def close(self):
  222.         self.lock.acquire()
  223.         
  224.         try:
  225.             return self._close()
  226.         finally:
  227.             self.lock.release()
  228.  
  229.  
  230.     
  231.     def _close(self):
  232.         self._disconnect()
  233.         if self.doc_in:
  234.             self.doc_in = None
  235.         
  236.         if self.features:
  237.             self.features = None
  238.         
  239.         self._reader = None
  240.         self.stream_id = None
  241.         if self.socket:
  242.             self.socket.close()
  243.         
  244.         self._reset()
  245.  
  246.     
  247.     def _make_reader(self):
  248.         self._reader = xmlextra.StreamReader(self)
  249.  
  250.     
  251.     def stream_start(self, doc):
  252.         self.doc_in = doc
  253.         self._StreamBase__logger.debug('input document: %r' % (self.doc_in.serialize(),))
  254.         
  255.         try:
  256.             r = self.doc_in.getRootElement()
  257.             if r.ns().getContent() != STREAM_NS:
  258.                 self._send_stream_error('invalid-namespace')
  259.                 raise FatalStreamError, 'Invalid namespace.'
  260.         except libxml2.treeError:
  261.             self._send_stream_error('invalid-namespace')
  262.             raise FatalStreamError, "Couldn't get the namespace."
  263.  
  264.         self.version = r.prop('version')
  265.         if self.version and self.version != '1.0':
  266.             self._send_stream_error('unsupported-version')
  267.             raise FatalStreamError, 'Unsupported protocol version.'
  268.         
  269.         to_from_mismatch = 0
  270.         if self.initiator:
  271.             self.stream_id = r.prop('id')
  272.             peer = r.prop('from')
  273.             if peer:
  274.                 peer = JID(peer)
  275.             
  276.             if self.peer:
  277.                 if peer and peer != self.peer:
  278.                     self._StreamBase__logger.debug('peer hostname mismatch: %r != %r' % (peer, self.peer))
  279.                     to_from_mismatch = 1
  280.                 
  281.             else:
  282.                 self.peer = peer
  283.         else:
  284.             to = r.prop('to')
  285.             if to:
  286.                 to = self.check_to(to)
  287.                 if not to:
  288.                     self._send_stream_error('host-unknown')
  289.                     raise FatalStreamError, 'Bad "to"'
  290.                 
  291.                 self.me = JID(to)
  292.             
  293.             self._send_stream_start(self.generate_id())
  294.             self._send_stream_features()
  295.             self.state_change('fully connected', self.peer)
  296.             self._post_connect()
  297.         if not self.version:
  298.             self.state_change('fully connected', self.peer)
  299.             self._post_connect()
  300.         
  301.         if to_from_mismatch:
  302.             raise HostMismatch
  303.         
  304.  
  305.     
  306.     def stream_end(self, _unused):
  307.         self._StreamBase__logger.debug('Stream ended')
  308.         self.eof = 1
  309.         if self.doc_out:
  310.             self._send_stream_end()
  311.         
  312.         if self.doc_in:
  313.             self.doc_in = None
  314.             self._reader = None
  315.             if self.features:
  316.                 self.features = None
  317.             
  318.         
  319.         self.state_change('disconnected', self.peer)
  320.  
  321.     
  322.     def stanza_start(self, doc, node):
  323.         pass
  324.  
  325.     
  326.     def stanza(self, _unused, node):
  327.         self._process_node(node)
  328.  
  329.     
  330.     def error(self, descr):
  331.         raise StreamParseError, descr
  332.  
  333.     
  334.     def _send_stream_end(self):
  335.         self.doc_out.getRootElement().addContent(' ')
  336.         s = self.doc_out.getRootElement().serialize(encoding = 'UTF-8')
  337.         end = s.rindex('<')
  338.         
  339.         try:
  340.             self._write_raw(s[end:])
  341.         except (IOError, SystemError, socket.error):
  342.             e = None
  343.             self._StreamBase__logger.debug('Sending stream closing tag failed:' + str(e))
  344.  
  345.         self.doc_out.freeDoc()
  346.         self.doc_out = None
  347.         if self.features:
  348.             self.features = None
  349.         
  350.  
  351.     
  352.     def _send_stream_start(self, sid = None):
  353.         if self.doc_out:
  354.             raise StreamError, 'Stream start already sent'
  355.         
  356.         self.doc_out = libxml2.newDoc('1.0')
  357.         root = self.doc_out.newChild(None, 'stream', None)
  358.         self.stream_ns = root.newNs(STREAM_NS, 'stream')
  359.         root.setNs(self.stream_ns)
  360.         self.default_ns = root.newNs(self.default_ns_uri, None)
  361.         for prefix, uri in self.extra_ns:
  362.             self.extra_ns[uri] = root.newNs(uri, prefix)
  363.         
  364.         if self.peer and self.initiator:
  365.             root.setProp('to', self.peer.as_utf8())
  366.         
  367.         if self.me and not (self.initiator):
  368.             root.setProp('from', self.me.as_utf8())
  369.         
  370.         root.setProp('version', '1.0')
  371.         if sid:
  372.             root.setProp('id', sid)
  373.             self.stream_id = sid
  374.         
  375.         sr = self.doc_out.serialize(encoding = 'UTF-8')
  376.         self._write_raw(sr[:sr.find('/>')] + '>')
  377.  
  378.     
  379.     def _send_stream_error(self, condition):
  380.         if not self.doc_out:
  381.             self._send_stream_start()
  382.         
  383.         e = StreamErrorNode(condition)
  384.         e.xmlnode.setNs(self.stream_ns)
  385.         self._write_raw(e.serialize())
  386.         e.free()
  387.         self._send_stream_end()
  388.  
  389.     
  390.     def _restart_stream(self):
  391.         self._reader = None
  392.         self.doc_out = None
  393.         self.doc_in = None
  394.         self.features = None
  395.         if self.initiator:
  396.             self._send_stream_start(self.stream_id)
  397.         
  398.         self._make_reader()
  399.  
  400.     
  401.     def _make_stream_features(self):
  402.         root = self.doc_out.getRootElement()
  403.         features = root.newChild(root.ns(), 'features', None)
  404.         return features
  405.  
  406.     
  407.     def _send_stream_features(self):
  408.         self.features = self._make_stream_features()
  409.         self._write_raw(self.features.serialize(encoding = 'UTF-8'))
  410.  
  411.     
  412.     def write_raw(self, data):
  413.         self.lock.acquire()
  414.         
  415.         try:
  416.             return self._write_raw(data)
  417.         finally:
  418.             self.lock.release()
  419.  
  420.  
  421.     
  422.     def _write_raw(self, data):
  423.         logging.getLogger('pyxmpp.Stream.out').debug('OUT: %r', data)
  424.         
  425.         try:
  426.             self.socket.send(data)
  427.         except (IOError, OSError, socket.error):
  428.             e = None
  429.             raise FatalStreamError('IO Error: ' + str(e))
  430.  
  431.  
  432.     
  433.     def _write_node(self, xmlnode):
  434.         if self.eof and not (self.socket) or not (self.doc_out):
  435.             self._StreamBase__logger.debug('Dropping stanza: %r' % (xmlnode,))
  436.             return None
  437.         
  438.         xmlnode = xmlnode.docCopyNode(self.doc_out, 1)
  439.         self.doc_out.addChild(xmlnode)
  440.         
  441.         try:
  442.             ns = xmlnode.ns()
  443.         except libxml2.treeError:
  444.             ns = None
  445.  
  446.         if ns and ns.content == xmlextra.COMMON_NS:
  447.             xmlextra.replace_ns(xmlnode, ns, self.default_ns)
  448.         
  449.         s = xmlextra.safe_serialize(xmlnode)
  450.         self._write_raw(s)
  451.         xmlnode.unlinkNode()
  452.         xmlnode.freeNode()
  453.  
  454.     
  455.     def send(self, stanza):
  456.         self.lock.acquire()
  457.         
  458.         try:
  459.             return self._send(stanza)
  460.         finally:
  461.             self.lock.release()
  462.  
  463.  
  464.     
  465.     def _send(self, stanza):
  466.         if not self.version:
  467.             
  468.             try:
  469.                 err = stanza.get_error()
  470.             except ProtocolError:
  471.                 err = None
  472.  
  473.             if err:
  474.                 err.downgrade()
  475.             
  476.         
  477.         self.fix_out_stanza(stanza)
  478.         self._write_node(stanza.xmlnode)
  479.  
  480.     
  481.     def idle(self):
  482.         self.lock.acquire()
  483.         
  484.         try:
  485.             return self._idle()
  486.         finally:
  487.             self.lock.release()
  488.  
  489.  
  490.     
  491.     def _idle(self):
  492.         self._iq_response_handlers.expire()
  493.         if not (self.socket) or self.eof:
  494.             return None
  495.         
  496.         now = time.time()
  497.         if self.keepalive and now - self.last_keepalive >= self.keepalive:
  498.             self._write_raw(' ')
  499.             self.last_keepalive = now
  500.         
  501.  
  502.     
  503.     def fileno(self):
  504.         self.lock.acquire()
  505.         
  506.         try:
  507.             return self.socket.fileno()
  508.         finally:
  509.             self.lock.release()
  510.  
  511.  
  512.     
  513.     def loop(self, timeout):
  514.         self.lock.acquire()
  515.         
  516.         try:
  517.             while not (self.eof) and self.socket is not None:
  518.                 act = self._loop_iter(timeout)
  519.                 if not act:
  520.                     self._idle()
  521.                     continue
  522.         finally:
  523.             self.lock.release()
  524.  
  525.  
  526.     
  527.     def loop_iter(self, timeout):
  528.         self.lock.acquire()
  529.         
  530.         try:
  531.             return self._loop_iter(timeout)
  532.         finally:
  533.             self.lock.release()
  534.  
  535.  
  536.     
  537.     def _loop_iter(self, timeout):
  538.         import select as select
  539.         self.lock.release()
  540.         
  541.         try:
  542.             (ifd, _unused, efd) = select.select([
  543.                 self.socket], [], [
  544.                 self.socket], timeout)
  545.         except select.error:
  546.             e = None
  547.             if e.args[0] != errno.EINTR:
  548.                 raise 
  549.             
  550.             ifd = []
  551.             _unused = []
  552.             efd = []
  553.         finally:
  554.             self.lock.acquire()
  555.  
  556.         if self.socket in ifd or self.socket in efd:
  557.             self._process()
  558.             return True
  559.         else:
  560.             return False
  561.  
  562.     
  563.     def process(self):
  564.         self.lock.acquire()
  565.         
  566.         try:
  567.             self._process()
  568.         finally:
  569.             self.lock.release()
  570.  
  571.  
  572.     
  573.     def _process(self):
  574.         
  575.         try:
  576.             
  577.             try:
  578.                 self._read()
  579.             except (xmlextra.error,):
  580.                 e = None
  581.                 self._StreamBase__logger.exception('Exception during read()')
  582.                 raise StreamParseError(unicode(e))
  583.             except:
  584.                 raise 
  585.  
  586.         except (IOError, OSError, socket.error):
  587.             e = None
  588.             self.close()
  589.             raise FatalStreamError('IO Error: ' + str(e))
  590.         except (FatalStreamError, KeyboardInterrupt, SystemExit):
  591.             e = None
  592.             self.close()
  593.             raise 
  594.  
  595.  
  596.     
  597.     def _read(self):
  598.         self._StreamBase__logger.debug('StreamBase._read(), socket: %r', self.socket)
  599.         if self.eof:
  600.             return None
  601.         
  602.         
  603.         try:
  604.             r = self.socket.recv(1024)
  605.         except socket.error:
  606.             e = None
  607.             if e.args[0] != errno.EINTR:
  608.                 raise 
  609.             
  610.             return None
  611.  
  612.         self._feed_reader(r)
  613.  
  614.     
  615.     def _feed_reader(self, data):
  616.         logging.getLogger('pyxmpp.Stream.in').debug('IN: %r', data)
  617.         if data:
  618.             
  619.             try:
  620.                 r = self._reader.feed(data)
  621.                 while r:
  622.                     r = self._reader.feed('')
  623.                 if r is None:
  624.                     self.eof = 1
  625.                     self.disconnect()
  626.             except StreamParseError:
  627.                 self._send_stream_error('xml-not-well-formed')
  628.                 raise 
  629.             except:
  630.                 None<EXCEPTION MATCH>StreamParseError
  631.             
  632.  
  633.         None<EXCEPTION MATCH>StreamParseError
  634.         self.eof = 1
  635.         self.disconnect()
  636.         if self.eof:
  637.             self.stream_end(None)
  638.         
  639.  
  640.     
  641.     def _process_node(self, xmlnode):
  642.         ns_uri = xmlnode.ns().getContent()
  643.         if ns_uri == 'http://etherx.jabber.org/streams':
  644.             self._process_stream_node(xmlnode)
  645.             return None
  646.         
  647.         if ns_uri == self.default_ns_uri:
  648.             stanza = stanza_factory(xmlnode, self)
  649.             self.lock.release()
  650.             
  651.             try:
  652.                 self.process_stanza(stanza)
  653.             finally:
  654.                 self.lock.acquire()
  655.                 stanza.free()
  656.  
  657.         else:
  658.             self._StreamBase__logger.debug('Unhandled node: %r' % (xmlnode.serialize(),))
  659.  
  660.     
  661.     def _process_stream_node(self, xmlnode):
  662.         if xmlnode.name == 'error':
  663.             e = StreamErrorNode(xmlnode)
  664.             self.lock.release()
  665.             
  666.             try:
  667.                 self.process_stream_error(e)
  668.             finally:
  669.                 self.lock.acquire()
  670.                 e.free()
  671.  
  672.             return None
  673.         elif xmlnode.name == 'features':
  674.             self._StreamBase__logger.debug('Got stream features')
  675.             self._StreamBase__logger.debug('Node: %r' % (xmlnode,))
  676.             self.features = xmlnode.copyNode(1)
  677.             self.doc_in.addChild(self.features)
  678.             self._got_features()
  679.             return None
  680.         
  681.         self._StreamBase__logger.debug('Unhandled stream node: %r' % (xmlnode.serialize(),))
  682.  
  683.     
  684.     def process_stream_error(self, err):
  685.         self._StreamBase__logger.debug('Unhandled stream error: condition: %s %r' % (err.get_condition().name, err.serialize()))
  686.  
  687.     
  688.     def check_to(self, to):
  689.         if to != self.me:
  690.             return None
  691.         
  692.         return to
  693.  
  694.     
  695.     def generate_id(self):
  696.         return '%i-%i-%s' % (os.getpid(), time.time(), str(random.random())[2:])
  697.  
  698.     
  699.     def _got_features(self):
  700.         ctxt = self.doc_in.xpathNewContext()
  701.         ctxt.setContextNode(self.features)
  702.         ctxt.xpathRegisterNs('stream', STREAM_NS)
  703.         ctxt.xpathRegisterNs('bind', BIND_NS)
  704.         bind_n = None
  705.         
  706.         try:
  707.             bind_n = ctxt.xpathEval('bind:bind')
  708.         finally:
  709.             ctxt.xpathFreeContext()
  710.  
  711.         if self.authenticated:
  712.             if bind_n:
  713.                 self.bind(self.me.resource)
  714.             else:
  715.                 self.state_change('authorized', self.me)
  716.         
  717.  
  718.     
  719.     def bind(self, resource):
  720.         iq = Iq(stanza_type = 'set')
  721.         q = iq.new_query(BIND_NS, u'bind')
  722.         if resource:
  723.             q.newTextChild(None, 'resource', to_utf8(resource))
  724.         
  725.         self.state_change('binding', resource)
  726.         self.set_response_handlers(iq, self._bind_success, self._bind_error)
  727.         self.send(iq)
  728.         iq.free()
  729.  
  730.     
  731.     def _bind_success(self, stanza):
  732.         jid_n = stanza.xpath_eval('bind:bind/bind:jid', {
  733.             'bind': BIND_NS })
  734.         if jid_n:
  735.             self.me = JID(jid_n[0].getContent().decode('utf-8'))
  736.         
  737.         self.state_change('authorized', self.me)
  738.  
  739.     
  740.     def _bind_error(self, stanza):
  741.         raise FatalStreamError, 'Resource binding failed'
  742.  
  743.     
  744.     def connected(self):
  745.         if self.doc_in and self.doc_out and not (self.eof):
  746.             return True
  747.         else:
  748.             return False
  749.  
  750.  
  751.